#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from datetime import timedelta
from typing import Any, Iterator, Mapping

import pendulum
from boto3 import session as boto3session
from botocore import UNSIGNED
from botocore.config import Config
from botocore.exceptions import ClientError

from airbyte_cdk import AirbyteTracedException, FailureType
from source_s3.s3_utils import make_s3_client

from .s3file import S3File
from .source_files_abstract.file_info import FileInfo
from .source_files_abstract.stream import IncrementalFileStream


class IncrementalFileStreamS3(IncrementalFileStream):
    @property
    def storagefile_class(self) -> type:
        return S3File

    def filepath_iterator(self, stream_state: Mapping[str, Any] = None) -> Iterator[FileInfo]:
        """
        :yield: url filepath to use in S3File()
        """
        stream_state = self._get_converted_stream_state(stream_state)
        prefix = self._provider.get("path_prefix")
        if prefix is None:
            prefix = ""

        msg = f"Iterating S3 bucket '{self._provider['bucket']}'"
        self.logger.info(msg + f" with prefix: '{prefix}' " if prefix != "" else msg)

        provider = self._provider
        client_config = None
        if S3File.use_aws_account(provider):
            session = boto3session.Session(
                aws_access_key_id=provider["aws_access_key_id"], aws_secret_access_key=provider["aws_secret_access_key"]
            )
        else:
            session = boto3session.Session()
            client_config = Config(signature_version=UNSIGNED)
        client = make_s3_client(provider, config=client_config, session=session)

        ctoken = None
        while True:
            # list_objects_v2 doesn't like a None value for ContinuationToken
            # so we don't set it if we don't have one.
            if ctoken:
                kwargs = dict(Bucket=provider["bucket"], Prefix=provider.get("path_prefix", ""), ContinuationToken=ctoken)  # type: ignore[unreachable]
            else:
                kwargs = dict(Bucket=provider["bucket"], Prefix=provider.get("path_prefix", ""))
            try:
                response = client.list_objects_v2(**kwargs)
                content = response["Contents"]
            except ClientError as e:
                message = e.response.get("Error", {}).get("Message", {})
                raise AirbyteTracedException(message, message, failure_type=FailureType.config_error)
            except KeyError:
                pass
            else:
                for file in content:
                    if self.is_not_folder(file) and self._filter_by_last_modified_date(file, stream_state):
                        yield FileInfo(key=file["Key"], last_modified=file["LastModified"], size=file["Size"])
            ctoken = response.get("NextContinuationToken", None)
            if not ctoken:
                break

    @staticmethod
    def is_not_folder(file) -> bool:
        return not file["Key"].endswith("/")

    def _filter_by_last_modified_date(self, file: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None):
        cursor_date = pendulum.parse(stream_state.get(self.cursor_field)) if stream_state else self.start_date

        file_in_history_and_last_modified_is_earlier_than_cursor_value = (
            stream_state is not None
            and self.cursor_field in stream_state.keys()
            and file.get("LastModified") <= self._get_datetime_from_stream_state(stream_state)
            and self.file_in_history(file["Key"], stream_state.get("history", {}))
        )

        file_is_not_in_history_and_last_modified_plus_buffer_days_is_earlier_than_cursor_value = file.get("LastModified") + timedelta(
            days=self.buffer_days
        ) < self._get_datetime_from_stream_state(stream_state) and not self.file_in_history(file["Key"], stream_state.get("history", {}))

        return (
            file.get("LastModified") > cursor_date
            and not file_in_history_and_last_modified_is_earlier_than_cursor_value
            and not file_is_not_in_history_and_last_modified_plus_buffer_days_is_earlier_than_cursor_value
        )
